package com.bjy.qa.agent.tester.handler.perf;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bjy.qa.agent.interceptor.perf.BeforeInterceptor;
import com.bjy.qa.agent.interceptor.InterceptorConf;
import com.bjy.qa.agent.context.Context;
import com.bjy.qa.agent.context.HandlerContextMap;
import com.bjy.qa.agent.enumtype.*;
import com.bjy.qa.agent.exception.BreakException;
import com.bjy.qa.agent.exception.MyException;
import com.bjy.qa.agent.model.*;
import com.bjy.qa.agent.response.Response;
import com.bjy.qa.agent.tester.LogUtil;
import com.bjy.qa.agent.tools.SpringTool;
import com.bjy.qa.agent.tools.python.Python3Helper;
import com.bjy.qa.agent.tools.python.PythonHelper;
import com.bjy.qa.agent.tools.python.PythonVersion;
import com.bjy.qa.agent.tools.resttemplate.RestTemplateHelper;
import com.bjy.qa.agent.tools.security.TripleDESUtil;
import com.bjy.qa.agent.transport.websocket.IWebSocketService;
import com.jcraft.jsch.JSchException;
import com.rslai.commons.ssh.*;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.ResponseEntity;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import static com.bjy.qa.agent.tools.security.TripleDESUtil.decrypt;

/**
 * PerfTester 步骤执行器
 */
public class PerfTesterStepHandler {
    private static final Logger logger = LoggerFactory.getLogger(PerfTesterStepHandler.class);

    private static final PerfConf perfConf; // 性能测试 配置
    private static final String perfTestscriptDir = "/perf-testscript"; // 性能测试脚本目录

    public LogUtil logUtil = new LogUtil(); // 设置上报运行结果工具类
    private Context context; // 将当前套件的 Context
    private int holdTime = 0; // 步骤间隔时间
    private RunStatus status = RunStatus.PASSED; // 测试任务运行状态

    private Response response = new Response(); // 保存返回数据(也可以理解成切面 - 某个步骤需要改变就改，不需要就不改，需要校验就校验)

    private static List<Constructor> beforeInterceptors = new LinkedList<>(); // 前置拦截器 列表

    static {
        perfConf = SpringTool.getBean(PerfConf.class);
        logger.info("性能测试 配置信息：{}", perfConf);

        InterceptorConf interceptorConf = SpringTool.getBean(InterceptorConf.class); // 获得 拦截器 配置

        // 遍历加载 tester 的前置拦截器
        for (String className : interceptorConf.getPerfBeforeInterceptors()) {
            try {
                if (StringUtils.isNotBlank(className)) {
                    Class clazz = Class.forName(className);
                    Constructor constructor = clazz.getConstructor(Map.class, PerfStep.class); // 获取构造器对象
                    beforeInterceptors.add(constructor);
                    logger.info("加载 tester 的前置拦截器: {}", className);
                }
            } catch (Throwable e) {
                e.printStackTrace();
                logger.error("加载 tester 的前置拦截器错误，className：" + className + "\t" + e.getMessage());
                throw new MyException("加载 tester 的前置拦截器错误，className：" + className + "\n" + e.getMessage());
            }
        }
    }

    /**
     * 构造函数
     * @param catalogType 分类类型 - 上报日志类型
     * @param caseId 测试用例 ID
     * @param iterationCount 迭代次数
     * @param resultId 测试任务结果 ID
     * @param type 运行类型（DEBUGGING、TESTING）
     * @param sessionId 当 type=DEBUGGING 需要传入sessionId，服务器收到上报后通过这个 sessionId 发送到对应的 websocket
     */
    public PerfTesterStepHandler(int catalogType, int caseId, String iterationCount, int resultId, RunningModeStatus type, String sessionId) {
        logUtil.resultId = resultId;
        logUtil.catalogType = catalogType;
        logUtil.caseId = caseId;
        logUtil.iterationCount = iterationCount;
        logUtil.type = type;
        logUtil.sessionId = sessionId;

        // 根据 resultId 得到一个 Context，要是没有则创建一个新的 Context
        context = HandlerContextMap.getContext(resultId);
        if (context == null) {
            context = new Context();
            HandlerContextMap.addContext(resultId, context);
        }
    }

    /**
     * 关闭 driver
     */
    public void closeDriver() {
        // 遍历关闭 WebSocket
        for (Map.Entry<String, IWebSocketService> entry : this.context.getWebSocketServiceMap().entrySet()) {
            String key = entry.getKey();
            IWebSocketService iWebSocketService = entry.getValue();

            iWebSocketService.close("关闭 WebSocket: " + key); // 关闭 WebSocket
            iWebSocketService = null; // 置空
            this.context.getWebSocketServiceMap().remove(key); // 从 WebSocketServiceMap 中移除
        }
    }

    /**
     * 设置全局参数
     * @param jsonObject 全局参数
     */
    public void setGlobalParams(JSONObject jsonObject) {
        if (jsonObject == null) {
            return;
        }
        for (Map.Entry<String, Object> entry : jsonObject.entrySet()) {
            if (entry.getValue().toString().indexOf(TripleDESUtil.CIPHERTEXT_PREFIX) == 0) {
                this.context.addGlobalParas(entry.getKey(), decrypt(entry.getValue().toString())); // 用户参数需要解密
            } else {
                this.context.addGlobalParas(entry.getKey(), entry.getValue()); // 普通参数
            }
        }
    }

    public LogUtil getLog() {
        return logUtil;
    }

    /**
     * 执行步骤
     * @param stepJSON 步骤 JSON
     * @param handleDes 步骤描述
     * @throws Throwable
     */
    public void runStep(JSONObject stepJSON, HandleDes handleDes) throws Throwable {
        PerfStep step = stepJSON.getObject("step", PerfStep.class); // 得到步骤

        Thread.sleep(holdTime * 1000); // 步骤间隔

        before(handleDes, this.context.getGlobalParas(), step);

        if (step.getCtype() == CatalogType.PERFORMANCE_TEST_SCRIPT) {
            handleDes.setStepDesc("运行性能测试脚本");
            performance(handleDes, step);
        } else {
            unknown(handleDes, step);
        }

        afterHandle(step, handleDes);
    }

    /**
     * 前置处理
     * @param globalParas
     * @param step
     */
    private void before(HandleDes handleDes, Map<String, Object> globalParas, PerfStep step) {
        // 遍历执行 tester 的前置拦截器
        for (Constructor constructor : beforeInterceptors) {
            try {
                BeforeInterceptor beforeInterceptor = (BeforeInterceptor) constructor.newInstance(globalParas, step); // 利用构造器创建一个对象
                beforeInterceptor.execute();
            } catch (Exception e) {
                e.printStackTrace();
                throw new BreakException("运行 tester 的前置拦截器错误，className：" + constructor.getName() + "\t" + e.getMessage());
            }
        }
    }

    /**
     * 后置处理
     * @param step 步骤
     * @param handleDes handleDes
     * @throws Throwable
     */
    public void afterHandle(PerfStep step, HandleDes handleDes) throws Throwable {
        String stepDesc = handleDes.getStepDesc();
        String stepType = handleDes.getStepType();
        String detail = handleDes.getDetail();
        Throwable e = handleDes.getE();
        if (e == null) {
            logUtil.sendStepLog(RunStatus.PASSED, stepDesc, stepType, detail);
        } else {
            logUtil.sendStepLog(RunStatus.ERROR, stepDesc, stepType + " 异常！", detail);
            setResultDetailStatus(RunStatus.ERROR);
            exceptionLog(e);
            throw e;
        }
    }

    /**
     * 异常信息上报
     * @param e
     */
    public void exceptionLog(Throwable e) {
        logUtil.sendStepLog(RunStatus.WARN, "", "", "异常信息： " + e.fillInStackTrace().toString());
    }

    /**
     * 发送测试执行状态
     */
    public void sendStatus() {
        logUtil.sendStatusLog(status);
    }

    /**
     * 设置测试执行状态
     * @param status 测试执行状态
     */
    public void setResultDetailStatus(RunStatus status) {
        if (status.getValue() > this.status.getValue()) {
            this.status = status;
        }
    }

    /**
     * 未实现的 step
     * @param handleDes handleDes
     * @param step step 信息
     */
    public void unknown(HandleDes handleDes, PerfStep step) {
        handleDes.setStepType("未实现的 step");
        handleDes.setE(new MyException(step.toString()));

        logger.error("未实现的 step: {}", step);
    }

    /**
     * 性能测试
     * @param handleDes 步骤描述
     * @param perfStep 性能测试用例步骤
     */
    public void performance(HandleDes handleDes, PerfStep perfStep) {
        handleDes.setStepType(perfStep.getScriptType().getName());

        perfStep.setName(perfStep.getName().replace("\\", "-").replace("/", "-").replace(" ", "-").replace("\t", "-")); // 脚本名称，将 \ / 空格 table 字符替换成 -

        try {
            switch (perfStep.getScriptType()) {
                case LOCUST:
                    this.locust(handleDes, perfStep);
                    break;
                case JMETER:
                    this.jmeter();
                    break;
                case LOAD_RUNNER:
                    this.loadRunner();
                    break;
                default:
                    throw new BreakException(String.format("不识别的脚本类型 scriptType: %s", perfStep.getScriptType()));
            }
            handleDes.setDetail("完成");
        } catch (Exception e) {
            handleDes.setE(e);
        }
    }

    /**
     * 性能测试 - LOCUST
     * @param handleDes 步骤描述
     * @param perfStep 性能测试用例步骤
     */
    private void locust(HandleDes handleDes, PerfStep perfStep) {
        logger.info("性能测试，运行 LOCUST 脚本 scriptName: {}", perfStep.getName());

        // 检查压力机环境
        logUtil.sendStepLog(RunStatus.INFO, "开始检查压力机环境", "", "");
        logger.info("开始检查压力机环境......");
        for (Generator generator : perfStep.getGenerators()) {
            switch (generator.getSystemType()) {
                case LINUX:
                    this.envCheckLocustLinux(generator, perfStep);
                    break;
                case MAC:
//                    break;
                case WINDOWS:
                    throw new BreakException(String.format("暂未实现的压力机系统类型 systemType: %s", generator.getSystemType()));
//                    break;
                default:
                    throw new BreakException(String.format("不识别的压力机系统类型 systemType: %s", generator.getSystemType()));
            }
        }
        logger.info("检查压力机环境，完成");
        logUtil.sendStepLog(RunStatus.INFO, "检查压力机环境", "", "成功");

        // 运行性能测试脚本
        logUtil.sendStepLog(RunStatus.INFO, "开始运行性能测试脚本", "", "");
        logger.info("开始运行性能测试脚本......");
        int workerCount = perfStep.getGenerators().size(); // 需要有多少个工作节点
        String masterIp = null; // 主节点（master）ip
        if (workerCount > 0) {
            JSONObject extra = JSON.parseObject(perfStep.getGenerators().get(0).getExtra());
            if (extra.getBoolean("isJumpServer")) {
                masterIp = extra.getString("serverIp");
            } else {
                masterIp = perfStep.getGenerators().get(0).getIp();
            }
        }
        for (Generator generator : perfStep.getGenerators()) {
            switch (generator.getSystemType()) {
                case LINUX:
                    this.runLocustLinux(masterIp, workerCount, generator, perfStep);
                    break;
                case MAC:
//                    break;
                case WINDOWS:
                    throw new BreakException(String.format("暂未实现的压力机系统类型 systemType: %s", generator.getSystemType()));
//                    break;
                default:
                    throw new BreakException(String.format("不识别的压力机系统类型 systemType: %s", generator.getSystemType()));
            }
        }
        logger.info("运行性能测试脚本，完成");
        logUtil.sendStepLog(RunStatus.INFO, "运行性能测试脚本", "", "成功");

        logUtil.sendStatusLog(RunStatus.RUNNING); // 环境检查 + 启动脚本 完成后修改 测试脚本 状态为 运行中

        // 收集性能测试结果
        logUtil.sendStepLog(RunStatus.INFO, "开始收集性能测试结果", "", "");
        logger.info("开始收集性能测试结果......");
        this.collectResults(masterIp, perfStep);
        logger.info("收集性能测试结果，完成");
        logUtil.sendStepLog(RunStatus.INFO, "收集性能测试结果", "", "完成");

        // 强制停止性能测试脚本
        if (this.status == RunStatus.BROKEN) {
            logUtil.sendStepLog(RunStatus.INFO, "开始强制停止性能测试脚本", "", "");
            logger.info("开始强制停止性能测试脚本......");
            for (Generator generator : perfStep.getGenerators()) {
                switch (generator.getSystemType()) {
                    case LINUX:
                        this.forceStopLocustLinux(generator);
                        break;
                    case MAC:
//                    break;
                    case WINDOWS:
                        throw new BreakException(String.format("暂未实现的压力机系统类型 systemType: %s", generator.getSystemType()));
//                    break;
                    default:
                        throw new BreakException(String.format("不识别的压力机系统类型 systemType: %s", generator.getSystemType()));
                }
            }
            logger.info("强制停止性能测试脚本，完成");
            logUtil.sendStepLog(RunStatus.INFO, "强制停止性能测试脚本", "", "完成");
        }
    }

    /**
     * 连接 ssh 服务器
     * @param generator 压力机（load generators）
     * @param sshTimeout ssh 超时时间（单位 ms）。由于有些命令运行时间特别长不返回，所以需要特殊配置。0 表示不设置用默认配置 30s 超时。
     * @return
     * @throws JSchException
     * @throws IOException
     * @throws TimeoutException
     */
    private SSH connectSshServer(Generator generator, int sshTimeout) throws JSchException, IOException, TimeoutException {
        StringBuffer info = new StringBuffer(); // 日志信息
        info.append(String.format("压力机 id: %s, desc: %s, systemType: %s", generator.getId(), generator.getDesc(), generator.getSystemType()));
        JSONObject extra = JSON.parseObject(generator.getExtra());

        SSHConfig sshConfig = new SSHConfig();
        if (sshTimeout != 0) {
            sshConfig.setReadTimeout(sshTimeout);
        }
        sshConfig.setHost(generator.getIp()); // ssh 服务器地址
        info.append(String.format(", host: %s", sshConfig.getHost()));
        sshConfig.setPort(generator.getPort());
        info.append(String.format(", port: %s", sshConfig.getPort()));
        sshConfig.setJumpServer(extra.getBoolean("isJumpServer"));
        info.append(String.format(", isJumpServer: %s", sshConfig.isJumpServer()));
        if (extra.getBoolean("isPasswordAuth")) {
            sshConfig.setUserAuthType(UserAuthTypeEnum.PASSWORD);
            sshConfig.setUsername(extra.getString("userName")); // 登录 ssh 用户名
            sshConfig.setPassword(extra.getString("password")); // 登录 ssh 密码
            info.append(", isPasswordAuth: true");
        } else {
            sshConfig.setUserAuthType(UserAuthTypeEnum.PUBKEY);
            info.append(", isPasswordAuth: false");
        }
        String serverIp = extra.getString("serverIp");

        logger.info(info.toString()); // log 环境检查日志信息
        logUtil.sendStepLog(RunStatus.INFO, "", "", info.toString()); // 上报 环境检查日志信息

        SSH ssh = null;
        SSHResponse sshResponse;
        ssh = new Jsch(sshConfig);

        // 如果是跳板机，进入服务器
        if (sshConfig.isJumpServer()) {
            sshResponse = ssh.execute(serverIp);
            logger.info("进入服务器 serverIp: {}", serverIp);
        }

        // for 执行预执行命令
        for (String cmd : generator.getRunCmd().split("\n")) {
            if (StringUtils.isNotBlank(cmd)) {
                sshResponse = ssh.execute(cmd);
                logger.info("执行预执行命令，cmd: 「{}」, response: 「{}」", sshResponse.getCmd(), sshResponse.getBody());
            }
        }
        return ssh;
    }

    /**
     * 性能测试 - JMETER
     */
    private void jmeter() {
        throw new BreakException("未实现的脚本类型 scriptType: JMETER");
    }

    /**
     * 性能测试 - LOAD_RUNNER
     */
    private void loadRunner() {
        throw new BreakException("未实现的脚本类型 scriptType: LOAD_RUNNER");
    }

    /**
     * 环境检查 Locust - linux
     * @param generator 压力机（load generators）Entity 模型
     * @param perfStep 性能测试用例步骤
     * @return
     */
    private boolean envCheckLocustLinux(Generator generator, PerfStep perfStep) {
        SSH ssh = null;
        try {
            ssh = connectSshServer(generator, 360000); // 连接 ssh 服务器

            logUtil.sendStepLog(RunStatus.INFO, "", "", "检测基础环境。如果没安装 glibc、conda、python 时间可能较长，安装 glibc 预计需要 5 分钟，安装 conda 预计需要 1 分钟，安装 python 预计需要 2 分钟");
            logger.info("初始化 PythonHelper。如果没安装 glibc、conda、python 时间可能较长，安装 glibc 预计需要 5 分钟，安装 conda 预计需要 1 分钟，安装 python 预计需要 2 分钟");
            PythonHelper pythonHelper = new Python3Helper(perfConf.getCondaConfig(), PythonVersion.PY_3_6_8, ssh, generator.getWorkDir(), perfConf.getDebug());

            // 安装依赖的 python 包
            String[] packageList = new String[]{"deepdiff"};
            pythonHelper.installDependPythonPackages(packageList);

            // 创建 性能测试脚本目录
            SSHResponse sshResponse = pythonHelper.executeCommand("mkdir -p " + pythonHelper.getHome() + perfTestscriptDir);
            if (sshResponse.getBody().contains("Permission denied")) {
                throw new BreakException(String.format("进入性能测试脚本目录：cmd: 「%s」, response: 「%s」", sshResponse.getCmd(), sshResponse.getBody()));
            }
            sshResponse = pythonHelper.executeCommand("cd " + pythonHelper.getHome() + perfTestscriptDir);
            if (sshResponse.getBody().contains("No such file or directory")) {
                throw new BreakException(String.format("进入性能测试脚本目录：cmd: 「%s」, response: 「%s」", sshResponse.getCmd(), sshResponse.getBody()));
            }
            sshResponse = pythonHelper.executeCommand("pwd");
            logger.info("进入性能测试脚本目录 dir: {}", sshResponse.getBody());

            // 保存性能测试脚本
            String scriptContent = perfStep.getContent().replace("\"", "\\\""); // " 字符转义成 \"
            sshResponse = pythonHelper.executeCommand(String.format("echo \"%s\" > \"perf_%s_%d_%d.py\"", scriptContent, perfStep.getName(), logUtil.resultId, logUtil.caseId));
            logger.info("生成测试脚本，文件名: perf_{}_{}_{}.py", perfStep.getName(), logUtil.resultId, logUtil.caseId);
            logUtil.sendStepLog(RunStatus.INFO, "", "", String.format("生成测试脚本，文件名: perf_%s_%d_%d.py", perfStep.getName(), logUtil.resultId, logUtil.caseId));

            logUtil.sendStepLog(RunStatus.INFO, "", "", "检测 locust 环境。如果没安装 locust 预计需要 1 分钟");
            logger.info("检测 locust 环境。如果没安装 locust 预计需要 1 分钟");
            if (!pythonHelper.checkLocustInstalled()) {
                pythonHelper.installLocust();
                logger.info("未安装 locust，安装 locust");
            }
        } catch (JSchException e) {
            throw new BreakException(e.toString());
        } catch (IOException e) {
            throw new BreakException(e.toString());
        } catch (TimeoutException e) {
            throw new BreakException(e.toString());
        } finally {
            if (ssh != null) {
                try {
                    ssh.disconnect(); // 释放ssh链接
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
        }

        return true;
    }

    /**
     * 运行性能测试脚本 Locust - linux
     * @param masterIp 主节点（master）ip
     * @param workerCount 需要有多少个工作节点
     * @param generator 压力机（load generators）Entity 模型
     * @param perfStep 性能测试用例步骤
     * @return
     */
    private boolean runLocustLinux(String masterIp, int workerCount, Generator generator, PerfStep perfStep) {
        SSH ssh = null;
        try {
            ssh = connectSshServer(generator, 0); // 连接 ssh 服务器

            PythonHelper pythonHelper = new Python3Helper(perfConf.getCondaConfig(), PythonVersion.PY_3_6_8, ssh, generator.getWorkDir(), perfConf.getDebug());

            // 进入 性能测试脚本目录
            SSHResponse sshResponse = pythonHelper.executeCommand("cd " + pythonHelper.getHome() + perfTestscriptDir);
            if (sshResponse.getBody().contains("No such file or directory")) {
                throw new BreakException(String.format("进入性能测试脚本目录：cmd: 「%s」, response: 「%s」", sshResponse.getCmd(), sshResponse.getBody()));
            }
            sshResponse = pythonHelper.executeCommand("pwd");
            logger.info("进入性能测试脚本目录 dir: {}", sshResponse.getBody());

            // 运行性能测试脚本
            logUtil.sendStepLog(RunStatus.INFO, "", "", "停止现有 locust: pkill locust");
            logger.info("执行 pkill locust 杀掉旧的 locust");
            pythonHelper.executeCommand("pkill locust");
            int i = 0;
            while (true) {
                sshResponse = pythonHelper.executeCommand("ps -ef | grep --color=never \"locust\"");
                if (!sshResponse.getBody().contains("--master") && !sshResponse.getBody().contains("--worker")) {
                    break;
                }
                if (i == 50) {
                    pythonHelper.executeCommand("pkill -9 locust"); // 在第15秒的时候补一次 pkill -9 杀进程
                }
                if (i++ > 100) {
                    throw new MyException(String.format("pkill locust 失败，超过 30s locust 仍然在运行。执行 %s 命令失败，错误信息：%s", sshResponse.getCmd(), sshResponse.getBody()));
                }
                try {
                    Thread.sleep(300);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            logger.info("执行 pkill locust 杀掉旧的 locust，成功");
            logUtil.sendStepLog(RunStatus.INFO, "", "", "停止现有 locust，成功");

            /**
             * locust 参数说明：
             *
             * -f：指定运行的 locust 脚本
             * -u/--users：并发 Locust 用户的峰值数量
             * -r/--spawn-rate：以（每秒用户数）生成用户的速率
             * -t/--run_time：在指定的时间后停止，例如（300s、20m、3h、1h30m 等）
             * --headless：禁用 Web 界面（使用终端）），并立即开始测试
             * --autostart: 立即开始测试（不禁用 Web UI）
             * --autoquit: n秒后完全退出 locust，例如 --autoquit 5
             * -H/--host=：url 前缀。或者在代码中写 host = 'https://www.cnblogs.com'
             * -P/--web-port：运行 web 的端口号
             * --master：分布式运行时的主节点
             * --expect-workers：指定需要有多少个工作节点，仅与 --master 一起使用
             * --web-host：指定 locust web 页面绑定的主机地址，默认为空字符串表示绑定到所有接口
             * --worker：分布式运行时，使用此参数指定当前为工作节点
             * --master-host：指定 locust 分布式执行的主节点地址，仅当与--worker参数同时运行时有效
             */
            // 启动主节点 - master
            String generatorIp; // 当前 generator IP，如果这个 IP = masterIp 则代表这个 generator 是 master
            JSONObject extra = JSON.parseObject(generator.getExtra());
            if (extra.getBoolean("isJumpServer")) {
                generatorIp = extra.getString("serverIp");;
            } else {
                generatorIp = generator.getIp();
            }
            if (masterIp != null && masterIp.equalsIgnoreCase(generatorIp)) {
                Scenario scenario = perfStep.getScenario(); // 性能测试场景
                logUtil.sendStepLog(RunStatus.INFO, "", "", String.format("性能测试场景 scenario「logMode：%s，controlMode：%s，loadTestMode：%s」", scenario.getLogMode().getName(), scenario.getControlMode().getName(), scenario.getLoadTestMode().getName()));

                /**
                 * 根据压测模式设置启动命令可选参数
                 *
                 * start - 起始并发数
                 * step - 并发数步长
                 * stepDuration - 步长持续时长
                 * max - 最大并发数
                 * duration - 持续时长
                 */
                String masterCmdOptions = ""; // master 启动命令可选参数
                JSONObject scenarioContent = JSONObject.parseObject(scenario.getContent()); // 性能测试场景内容
                if (scenario.getLoadTestMode() == LoadTestMode.CONCURRENT) { // 并发模式
                    masterCmdOptions = String.format(" --users %s --spawn-rate %s --run-time %ss ", scenarioContent.getIntValue("count"), scenarioContent.getIntValue("count"), scenarioContent.getIntValue("duration"));
                    logUtil.sendStepLog(RunStatus.INFO, "", "", String.format("压测模式 loadTestMode「持续时长(s)：%s，并发数：%s」", scenarioContent.getIntValue("duration"), scenarioContent.getIntValue("count")));
                } else if (scenario.getLoadTestMode() == LoadTestMode.ROUND_ROBIN) { // 轮次模式
                    throw new MyException("暂不支持的压测模式 LoadTestMode: " + scenario.getLoadTestMode());
                } else if (scenario.getLoadTestMode() == LoadTestMode.RAMP_UP) { // 阶梯模式
                    masterCmdOptions = String.format(" --users %s --spawn-rate %s --run-time %ss ", scenarioContent.getIntValue("start"), scenarioContent.getIntValue("start"), scenarioContent.getIntValue("duration"));
                    logUtil.sendStepLog(RunStatus.INFO, "", "", String.format("压测模式 loadTestMode「起始并发数：%d，并发数步长：%d，步长持续时长(s)：%d，最大并发数：%d，持续时长(s)：%d」", scenarioContent.getIntValue("start"), scenarioContent.getIntValue("step"), scenarioContent.getIntValue("stepDuration"), scenarioContent.getIntValue("max"), scenarioContent.getIntValue("duration")));
                } else if (scenario.getLoadTestMode() == LoadTestMode.ERROR_RATE) { // 错误率模式
                    throw new MyException("暂不支持的压测模式 LoadTestMode: " + scenario.getLoadTestMode());
                } else if (scenario.getLoadTestMode() == LoadTestMode.RESPONSE_TIME) { // 响应时间模式
                    throw new MyException("暂不支持的压测模式 LoadTestMode: " + scenario.getLoadTestMode());
                } else if (scenario.getLoadTestMode() == LoadTestMode.THROUGHPUT) { // 每秒应答数模式
                    throw new MyException("暂不支持的压测模式 LoadTestMode: " + scenario.getLoadTestMode());
                } else {
                    throw new MyException("不识别的压测模式 LoadTestMode: " + scenario.getLoadTestMode());
                }

                String cmdStr = String.format("nohup locust -f \"perf_%s_%d_%d.py\" --autostart --autoquit 30 --master --web-host=\"%s\" --expect-workers=%s --web-port 56287 %s > nohup_master.out 2>&1 &", perfStep.getName(), logUtil.resultId, logUtil.caseId, masterIp, workerCount, masterCmdOptions);
                logUtil.sendStepLog(RunStatus.INFO, "", "", "运行性能测试脚本主节点（master）：" + cmdStr);
                logger.info("运行性能测试脚本主节点（master）：{}", cmdStr);
                sshResponse = pythonHelper.executeCommand(cmdStr);
                i = 0;
                while (true) {
                    sshResponse = pythonHelper.executeCommand("ps -ef | grep --color=never \"locust\"");
                    if (sshResponse.getBody().contains(String.format("perf_%s_%d_%d.py", perfStep.getName(), logUtil.resultId, logUtil.caseId)) && sshResponse.getBody().contains("--master")) {
                        break;
                    }
                    if (i++ > 30) {
                        throw new MyException(String.format("运行性能测试脚本主节点（master）失败，超过 9s master 仍未正常启动。执行 %s 命令失败，错误信息：%s", sshResponse.getCmd(), sshResponse.getBody()));
                    }
                    try {
                        Thread.sleep(300);
                    } catch (Exception e) {
                    }
                }
                i = 0;
                while (true) {
                    sshResponse = pythonHelper.executeCommand("netstat -nap | grep --color=never \":5557 \" | grep --color=never \"LISTEN\"");
                    if (sshResponse.getBody().contains(":5557")) {
                        break;
                    }
                    if (i++ > 30) {
                        SSHResponse tempSSHResponse = pythonHelper.executeCommand("cat nohup_master.out");
                        if (tempSSHResponse.getBody().contains("Error")) {
                            throw new MyException(String.format("运行性能测试脚本主节点（master）失败，locust 主节点（master）超过 9s 仍无法启动 5557 监听端口。\n\n%s", tempSSHResponse.getBody()));
                        } else {
                            throw new MyException(String.format("运行性能测试脚本主节点（master）失败，locust 主节点（master）超过 9s 仍无法启动 5557 监听端口。执行 %s 命令失败，错误信息：%s ==== 看到这个异常有可能是脚本启动失败，未能捕捉到并抛出，需要排查下代码 =====", sshResponse.getCmd(), sshResponse.getBody()));
                        }
                    }
                    try {
                        Thread.sleep(300);
                    } catch (Exception e) {
                    }
                }
                logger.info("运行性能测试脚本主节点（master），成功。{} ", cmdStr);
                logUtil.sendStepLog(RunStatus.INFO, "", "", "运行性能测试脚本主节点（master），成功");
            }

            // 启动工作节点 - worker
            String cmdStr = String.format("nohup locust -f \"perf_%s_%d_%d.py\" --worker --master-host='%s' > nohup_worker.out 2>&1 &", perfStep.getName(), logUtil.resultId, logUtil.caseId, masterIp);
            logUtil.sendStepLog(RunStatus.INFO, "", "", "运行性能测试脚本工作节点（worker）：" + cmdStr);
            logger.info("运行性能测试脚本工作节点（worker）：{}", cmdStr);
            sshResponse = pythonHelper.executeCommand(cmdStr);
            i = 0;
            while (true) {
                sshResponse = pythonHelper.executeCommand("ps -ef | grep --color=never \"locust\"");
                if (sshResponse.getBody().contains(String.format("perf_%s_%d_%d.py", perfStep.getName(), logUtil.resultId, logUtil.caseId)) && sshResponse.getBody().contains("--worker")) {
                    break;
                }
                if (i++ > 30) {
                    throw new MyException(String.format("运行性能测试脚本工作节点（worker）失败，超过 9s worker 仍未正常启动。执行 %s 命令失败，错误信息：%s", sshResponse.getCmd(), sshResponse.getBody()));
                }
                try {
                    Thread.sleep(300);
                } catch (Exception e) {
                }
            }
            i = 0;
            while (true) {
                sshResponse = pythonHelper.executeCommand("netstat -an | grep --color=never \":5557 \" |grep --color=never ESTABLISHED | awk {'print $5'}");
                if (sshResponse.getBody().contains(":5557")) {
                    break;
                }
                if (i++ > 30) {
                    SSHResponse tempSSHResponse = pythonHelper.executeCommand("cat nohup_worker.out");
                    if (tempSSHResponse.getBody().contains("Error")) {
                        throw new MyException(String.format("运行性能测试脚本工作节点（worker）失败。locust 工作节点（worker）超过 9s 仍无法连接到主节点（master）的 5557 端口，worker: %s, master: %s \n\n%s", generatorIp, masterIp, tempSSHResponse.getBody()));
                    } else {
                        throw new MyException(String.format("运行性能测试脚本工作节点（worker）失败。locust 工作节点（worker）超过 9s 仍无法连接到主节点（master）的 5557 端口，worker: %s, master: %s ==== 看到这个异常有可能是脚本启动失败，未能捕捉到并抛出，需要排查下代码 =====", generatorIp, masterIp));
                    }
                }
                try {
                    Thread.sleep(300);
                } catch (Exception e) {
                }
            }
            logger.info("运行性能测试脚本工作节点（worker），成功。{} ", cmdStr);
            logUtil.sendStepLog(RunStatus.INFO, "", "", "运行性能测试脚本工作节点（worker），成功");
        } catch (JSchException e) {
            throw new BreakException(e.toString());
        } catch (IOException e) {
            throw new BreakException(e.toString());
        } catch (TimeoutException e) {
            throw new BreakException(e.toString());
        } finally {
            if (ssh != null) {
                try {
                    ssh.disconnect(); // 释放ssh链接
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
        }

        return true;
    }

    /**
     * 强制停止性能测试脚本 Locust - linux
     * @param generator 压力机（load generators）Entity 模型
     * @return
     */
    private boolean forceStopLocustLinux(Generator generator) {
        SSH ssh = null;
        try {
            ssh = connectSshServer(generator, 0); // 连接 ssh 服务器
            PythonHelper pythonHelper = new Python3Helper(perfConf.getCondaConfig(), PythonVersion.PY_3_6_8, ssh, generator.getWorkDir(), perfConf.getDebug());

            // 强制停止 locust
            logUtil.sendStepLog(RunStatus.INFO, "", "", "强制停止 locust: pkill locust");
            logger.info("执行 pkill locust 杀掉 locust");
            pythonHelper.executeCommand("pkill locust");
            int i = 0;
            while (true) {
                SSHResponse sshResponse = pythonHelper.executeCommand("ps -ef | grep --color=never \"locust\"");
                if (!sshResponse.getBody().contains("--master") && !sshResponse.getBody().contains("--worker")) {
                    break;
                }
                if (i == 50) {
                    pythonHelper.executeCommand("pkill -9 locust"); // 在第15秒的时候补一次 pkill -9 杀进程
                }
                if (i++ > 100) {
                    throw new MyException(String.format("pkill locust 失败，超过 30s locust 仍然在运行。执行 %s 命令失败，错误信息：%s", sshResponse.getCmd(), sshResponse.getBody()));
                }
                try {
                    Thread.sleep(300);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            logger.info("执行 pkill locust 杀掉 locust，成功");
            logUtil.sendStepLog(RunStatus.INFO, "", "", "强制停止 locust，成功");
        } catch (JSchException e) {
            throw new BreakException(e.toString());
        } catch (IOException e) {
            throw new BreakException(e.toString());
        } catch (TimeoutException e) {
            throw new BreakException(e.toString());
        } finally {
            if (ssh != null) {
                try {
                    ssh.disconnect(); // 释放ssh链接
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
        }

        return true;
    }

    /**
     * 收集性能测试结果
     * @param masterIp 主节点（master）ip
     * @param perfStep 性能测试用例步骤
     * @return
     */
    private boolean collectResults(String masterIp, PerfStep perfStep) {
        Scenario scenario = perfStep.getScenario(); // 性能测试场景
        JSONObject scenarioContent = JSONObject.parseObject(scenario.getContent()); // 性能测试场景内容

        int currentUserCount = 0; // 当前并发用户数
        long timeout = System.currentTimeMillis() + scenarioContent.getIntValue("duration") * 1000 + 25 * 1000; // 超时时间 = 超时时间 + （locust 自动退出时间 30s -5s）

        // 设置 下次修改并发数时间
        long nextStepTime = 0;
        if (scenarioContent.getIntValue("stepDuration") != 0) {
            nextStepTime = System.currentTimeMillis() + scenarioContent.getIntValue("stepDuration") * 1000;
        }

        long getReportTime = System.currentTimeMillis(); // 获取测试报告时间

        // 超时时间内，收集日志等
        while (System.currentTimeMillis() <= timeout) {
            // 用户点击 中断，退出日志收集
            if (this.status == RunStatus.BROKEN) {
                logger.info("用户主动 中断 性能测试任务");
                break;
            }

            if (System.currentTimeMillis() >= getReportTime) {
                getReportTime = System.currentTimeMillis() + 5000; // 下次获取报告时间为下一个 5s

                logger.info("收集性能测试结果 http://" + masterIp + ":56287" + PerfLogType.LOG_TASKS.getName());
                String url = "http://" + masterIp + ":56287" + PerfLogType.LOG_TASKS.getName();
                ResponseEntity<String> result = RestTemplateHelper.httpGet(url, new HashMap<>(), new HashMap<>(), String.class);
                JSONObject jb = JSONObject.parseObject(result.getBody());
                logUtil.sendStepLog(RunStatus.INFO, Integer.toString(PerfLogType.LOG_TASKS.getValue()), "", jb.toJSONString());

                logger.info("收集性能测试结果 http://" + masterIp + ":56287" + PerfLogType.LOG_EXCEPTIONS.getName());
                url = "http://" + masterIp + ":56287" + PerfLogType.LOG_EXCEPTIONS.getName();
                result = RestTemplateHelper.httpGet(url, new HashMap<>(), new HashMap<>(), String.class);
                jb = JSONObject.parseObject(result.getBody());
                logUtil.sendStepLog(RunStatus.INFO, Integer.toString(PerfLogType.LOG_EXCEPTIONS.getValue()), "", jb.toJSONString());


                logger.info("收集性能测试结果 http://" + masterIp + ":56287" + PerfLogType.LOG_REQUESTS.getName());
                url = "http://" + masterIp + ":56287" + PerfLogType.LOG_REQUESTS.getName();
                result = RestTemplateHelper.httpGet(url, new HashMap<>(), new HashMap<>(), String.class);
                jb = JSONObject.parseObject(result.getBody());
                currentUserCount = jb.getIntValue("user_count"); // 当前并发用户数
                logUtil.sendStepLog(RunStatus.INFO, Integer.toString(PerfLogType.LOG_REQUESTS.getValue()), "", jb.toJSONString());

                JSONObject jsonObject = JSONObject.parseObject(result.getBody());
                String state = jsonObject.getString("state");
                logger.info("性能测试脚本运行状态 state: {}", state);
                if ("stopped".equalsIgnoreCase(state) || "stopping".equalsIgnoreCase(state)) {
                    logger.info("收集性能测试结果 - csv http://" + masterIp + ":56287" + PerfLogType.CSV_REQUESTS.getName());
                    url = "http://" + masterIp + ":56287" + PerfLogType.CSV_REQUESTS.getName();
                    result = RestTemplateHelper.httpGet(url, new HashMap<>(), new HashMap<>(), String.class);
                    logUtil.sendStepLog(RunStatus.INFO, Integer.toString(PerfLogType.CSV_REQUESTS.getValue()), "", result.getBody());

                    logger.info("收集性能测试结果 - csv http://" + masterIp + ":56287" + PerfLogType.CSV_FAILURES.getName());
                    url = "http://" + masterIp + ":56287" + PerfLogType.CSV_FAILURES.getName();
                    result = RestTemplateHelper.httpGet(url, new HashMap<>(), new HashMap<>(), String.class);
                    logUtil.sendStepLog(RunStatus.INFO, Integer.toString(PerfLogType.CSV_FAILURES.getValue()), "", result.getBody());

                    logger.info("收集性能测试结果 - csv http://" + masterIp + ":56287" + PerfLogType.CSV_EXCEPTIONS.getName());
                    url = "http://" + masterIp + ":56287" + PerfLogType.CSV_EXCEPTIONS.getName();
                    result = RestTemplateHelper.httpGet(url, new HashMap<>(), new HashMap<>(), String.class);
                    logUtil.sendStepLog(RunStatus.INFO, Integer.toString(PerfLogType.CSV_EXCEPTIONS.getValue()), "", result.getBody());

                    logger.info("locust 性能测试脚本运行结束。state:{}", state);
                    break;
                }
            }

            /**
             * 根据压测模式重新修改并发用户数等参数
             *
             * start - 起始并发数
             * step - 并发数步长
             * stepDuration - 步长持续时长
             * max - 最大并发数
             * duration - 持续时长
             */
            if (scenario.getLoadTestMode() == LoadTestMode.CONCURRENT) { // 并发模式，什么都不需要处理等待 duration 后结束
            } else if (scenario.getLoadTestMode() == LoadTestMode.ROUND_ROBIN) { // 轮次模式
                throw new MyException("暂不支持的压测模式 LoadTestMode: " + scenario.getLoadTestMode());
            } else if (scenario.getLoadTestMode() == LoadTestMode.RAMP_UP) { // 阶梯模式
                if (nextStepTime != 0 && System.currentTimeMillis() >= nextStepTime) { // 当前时间 大于 下次修改并发数时间，改变当前并发数
                    if (currentUserCount < scenarioContent.getIntValue("max")) { // 当前并发数 小于 最大并发数，不用修改
                        nextStepTime = System.currentTimeMillis() + scenarioContent.getIntValue("stepDuration") * 1000; // 设置 下次修改并发数时间

                        // 设置 修改并发用户数
                        int changeUserCount = currentUserCount + scenarioContent.getIntValue("step");
                        if (changeUserCount > scenarioContent.getIntValue("max")) { // 修改并发用户数 大于 最大并发数
                            changeUserCount = scenarioContent.getIntValue("max"); // 修改并发用户数 为 最大并发数
                        }

                        // 发送修改用户数请求
                        Map<String, Object> bodyParams = new HashMap<>();
                        bodyParams.put("user_count", changeUserCount);
                        bodyParams.put("spawn_rate", changeUserCount);
                        ResponseEntity<String> result = RestTemplateHelper.httpPostFormBody("http://" + masterIp + ":56287/swarm", new HashMap<>(), new HashMap<>(), bodyParams, String.class);
                        JSONObject jb = JSONObject.parseObject(result.getBody());
                        if (jb.getBooleanValue("success")) {
                            logger.info(LoadTestMode.RAMP_UP.getName() + " 修改并发用户数为：" + changeUserCount + "，成功");
                            logUtil.sendStepLog(RunStatus.INFO, "", "", LoadTestMode.RAMP_UP.getName() + " 修改并发用户数为：" + changeUserCount + "，成功");
                        } else {
                            logger.info(LoadTestMode.RAMP_UP.getName() + " 修改并发用户数为：" + changeUserCount + "，失败");
                            logUtil.sendStepLog(RunStatus.WARN, "", "", LoadTestMode.RAMP_UP.getName() + " 修改并发用户数为： " + changeUserCount + "，失败");
                        }
                    } else {
                        nextStepTime = 0;
                    }
                }
            } else if (scenario.getLoadTestMode() == LoadTestMode.ERROR_RATE) { // 错误率模式
                throw new MyException("暂不支持的压测模式 LoadTestMode: " + scenario.getLoadTestMode());
            } else if (scenario.getLoadTestMode() == LoadTestMode.RESPONSE_TIME) { // 响应时间模式
                throw new MyException("暂不支持的压测模式 LoadTestMode: " + scenario.getLoadTestMode());
            } else if (scenario.getLoadTestMode() == LoadTestMode.THROUGHPUT) { // 每秒应答数模式
                throw new MyException("暂不支持的压测模式 LoadTestMode: " + scenario.getLoadTestMode());
            } else {
                throw new MyException("不识别的压测模式 LoadTestMode: " + scenario.getLoadTestMode());
            }

            try {
                Thread.sleep(1000);
            } catch (Exception e) {
            }
        }
        return true;
    }
}